package redis_stream

import (
	"encoding/json"
	"fmt"
	"math"
	"time"
)

const (
	defaultGroup         = "redis_stream"
	defaultConsumerCount = 10
	defaultFetchCount    = 10
	defaultIdle          = time.Minute * 10
)

type ConsumerOption struct {
	ConsumerCount int
	FetchCount    int64
	StreamMaxLen  int64
	IsDelay       bool
}

type ConsumerGroup struct {
	client   *RedisStream
	option   *ConsumerOption
	stream   string
	callback ConsumerCallback
}

type ci interface {
	Run(group *ConsumerGroup) error
}

type Payload struct {
	ID      string `json:"id"`
	Payload any    `json:"payload"`
	Stream  string `json:"stream"`
	buf interface{}
}

func (p *Payload) MarshalBinary() (data []byte, err error) {
	return json.Marshal(p)
}

func (p *Payload) UnmarshalBinary(data []byte) error {
	return json.Unmarshal(data, p)
}

func (r *RedisStream) CreateConsumerGroup(stream string, option *ConsumerOption) *ConsumerGroup {
	if option == nil {
		option = &ConsumerOption{
			ConsumerCount: defaultConsumerCount,
			FetchCount:    defaultFetchCount,
			StreamMaxLen:  math.MaxInt,
		}
	}

	return &ConsumerGroup{
		client: r,
		option: option,
		stream: stream,
	}
}

type ConsumerCallback func(message *Payload) error

func (c *ConsumerGroup) Callback(callback ConsumerCallback) {
	c.callback = callback
}

func (c *ConsumerGroup) Listen() error {
	if c.option.ConsumerCount == 0 {
		c.option.ConsumerCount = defaultConsumerCount
	}
	if c.option.FetchCount == 0 {
		c.option.FetchCount = defaultFetchCount
	}

	var consumer ci
	if !c.option.IsDelay {
		consumer = streamC{}
	} else {
		consumer = streamD{}
	}
	return consumer.Run(c)
}

func (c *ConsumerGroup) nextConsumerId(current int) string {
	next := (current % c.option.ConsumerCount) + 1
	return fmt.Sprintf("consumer-%d", next)
}
